{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import requests\n",
    "import codecs\n",
    "import re\n",
    "import nltk\n",
    "from pyspark import SparkConf, SparkContext\n",
    "from pyspark.sql import SparkSession, Row\n",
    "from pyspark.sql.functions import udf\n",
    "from pyspark.sql.types import StringType\n",
    "from tqdm import tqdm"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.types import StructField, StructType"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "if os.path.exists(\"./data/clean\"):\n",
    "    pass\n",
    "else:\n",
    "    os.mkdir(\"./data/clean\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "#Create SparkConf\n",
    "sparkConf =SparkConf().setAppName('DataClean').setMaster('local[*]')\n",
    "#Create SparkContext\n",
    "sc=SparkContext(conf=sparkConf)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = SparkSession(sc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "f = codecs.open(\"./data/stopwords.txt\")\n",
    "stopwords = f.readlines()\n",
    "f.close()\n",
    "stopwords = [word.strip() for word in stopwords]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "def cut_and_filter(sentence):\n",
    "    sentence2 = re.sub('[^\\w ]','',sentence).lower()\n",
    "    cut_result = nltk.word_tokenize(sentence2)\n",
    "    result = [word for word in cut_result if word not in stopwords]\n",
    "    return \" \".join(result)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "def cut(sentence):\n",
    "    sentence2 = re.sub('[^\\w ]','',sentence).lower()\n",
    "    cut_result = nltk.word_tokenize(sentence2)\n",
    "    return \" \".join(cut_result)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "def process_columns(row):\n",
    "    processed_row = Row(genre= str(row.genre),\n",
    "                        sentence1= cut_and_filter(row.sentence1), \n",
    "                        sentence2= cut_and_filter(row.sentence2))\n",
    "    return processed_row"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "def process_columns_without_filter(row):\n",
    "    processed_row = Row(genre= str(row.genre),\n",
    "                        sentence1= cut(row.sentence1), \n",
    "                        sentence2= cut(row.sentence2))\n",
    "    return processed_row"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 5/5 [00:33<00:00,  6.67s/it, processing with train.tsv]          \n"
     ]
    }
   ],
   "source": [
    "filenames = list(filter(lambda x: x.endswith(\".tsv\"),os.listdir(\"./data/MNLI/\")))\n",
    "with tqdm(filenames, total=len(filenames)) as t:\n",
    "    for filename in t:\n",
    "        t.set_postfix_str(\"processing with {}\".format(filename))\n",
    "        file = filename.split(\".\")[0]\n",
    "        df = spark.read.option(\"header\", \"true\").csv(\"./data/MNLI/{}\".format(filename), sep= \"\\t\")\n",
    "        df_filter = df.dropna().drop_duplicates().select([\"genre\", \"sentence1\", \"sentence2\"])\n",
    "        temp_rdd = df_filter.rdd.map(lambda x: process_columns(x)).map(lambda x:(\n",
    "            x.genre,x.sentence1,x.sentence2\n",
    "        ))\n",
    "        schema = StructType([\n",
    "            StructField('genre', StringType()),\n",
    "            StructField('sentence1', StringType()),\n",
    "            StructField('sentence2', StringType())])\n",
    "        df_new = spark.createDataFrame(temp_rdd, schema= schema)\n",
    "        df_new.write.csv(\"./data/clean/{}\".format(file), mode=\"overwrite\", sep=\"\\t\", header= True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "filename=\"train.tsv\"\n",
    "file = filename.split(\".\")[0]\n",
    "df = spark.read.option(\"header\", \"true\").csv(\"./data/MNLI/{}\".format(filename), sep= \"\\t\")\n",
    "df_filter = df.dropna().drop_duplicates().select([\"genre\", \"sentence1\", \"sentence2\"])\n",
    "temp_rdd = df_filter.rdd.map(lambda x: process_columns_without_filter(x)).map(lambda x:(\n",
    "    x.genre,x.sentence1,x.sentence2\n",
    "))\n",
    "schema = StructType([\n",
    "    StructField('genre', StringType()),\n",
    "    StructField('sentence1', StringType()),\n",
    "    StructField('sentence2', StringType())])\n",
    "df_new = spark.createDataFrame(temp_rdd, schema= schema)\n",
    "df_new.write.csv(\"./data/clean/nofilter_{}\".format(file), mode=\"overwrite\", sep=\"\\t\", header= True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
